package com.dark.trident;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import storm.trident.operation.BaseFilter;
import storm.trident.tuple.TridentTuple;

/**
 * Created by tengxue on 16-6-30.
 */
public class DiseaseFilter extends BaseFilter {

    private static final long serialVersionUID=1L;

    private static final Logger LOG= LoggerFactory.getLogger(DiseaseFilter.class);

    @Override
    public boolean isKeep(TridentTuple tuple) {
        DiagnosisEvent diagnosis=(DiagnosisEvent)tuple.get(0);
        Integer code=Integer.parseInt(diagnosis.getDiagnosisCode());
        if (code.intValue()<=322){
            LOG.info("Emitting disease ["+diagnosis.getDiagnosisCode()+" ] ");
            return true;
        }else {
            LOG.info("Filtering disease ["+diagnosis.getDiagnosisCode()+" ] ");
        }
        return false;
    }
}
